package com.huan.filter.udf;

import com.huan.filter.vo.Person;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink暴露了所有UDF函数的接口，具体实现方式为接口或者抽象类，例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类，实现对应的接口。
 *
 * @author huan.fu
 * @date 2023/9/18 - 22:43
 */
public class RichMapFunctionApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.fromElements(
                        new Person(1, "张三", 20),
                        new Person(2, "李四", 25),
                        new Person(3, "王五", 30)
                )
                .map(new NameRichMapFunction())
                .print();

        environment.execute("map operation");
    }

    public static class NameRichMapFunction extends RichMapFunction<Person, String> {

        @Override
        public void open(Configuration parameters) throws Exception {
            RuntimeContext context = getRuntimeContext();
            String jobId = context.getJobId().toString();
            int indexOfThisSubtask = context.getIndexOfThisSubtask();
            String taskNameWithSubtasks = context.getTaskNameWithSubtasks();

            System.out.println("任务jobId:[" + jobId + "], 子任务编号:[" + indexOfThisSubtask + "] 子任务名:[" + taskNameWithSubtasks + "]调用 open方法");
        }

        @Override
        public void close() throws Exception {
            System.out.println("调用 close 方法");
        }

        @Override
        public String map(Person person) throws Exception {
            return person.getName();
        }
    }


}